-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tighten sequence numbers recovery #22212
Conversation
4402935
to
54f8aba
Compare
retest this please |
1 similar comment
retest this please |
This commit touches addresses issues related to recovery and sequence numbers: - A sequence number can be assigned and a Lucene commit created with a maximum sequence number at least as large as that sequence number, yet the operation corresponding to that sequence number can be missing from both the Lucene commit and the translog. This means that upon recovery the local checkpoint will be stuck at or below this missing sequence number. To address this, we force the local checkpoint to the maximum sequence number in the Lucene commit when opening the engine. Note that there can still be gaps in the history in the translog but we do not address those here. - The global checkpoint is transferred to the target shard at the end of peer recovery. - Additionally, we reenable the relocation integration tests. Lastly, this work uncovered some bugs in the assignment of sequence numbers on replica operations: - setting the sequence number on replica write requests was missing, very likely introduced as a result of resolving merge conflicts - handling operations that arrive out of order on a replica and have a version conflict with a previous operation were never marked as processed
54f8aba
to
3164819
Compare
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great. I left some suggestion. I do think there is something wrong with the inner delete method on the engine
@@ -150,6 +150,7 @@ protected WritePrimaryResult shardOperationOnPrimary(BulkShardRequest request, I | |||
final long version = indexResult.getVersion(); | |||
indexRequest.version(version); | |||
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); | |||
indexRequest.seqNo(indexResult.getSeqNo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe assert at the end of this method that the seqNo is set on the replica request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); | ||
// norelease | ||
/* | ||
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit. These means that we there |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "are in the lucene commit" -> "are in the lucene commit or the translog generation associated with it"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.
seqNo = delete.seqNo(); | ||
} | ||
|
||
if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we do this twice now? I'm not sure this what you intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also - it feels like this can simplified based on the origin. I do wonder if the code reuse in checkVersionConflict
is worth the extra complexity it brings to reading the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. I pushed 1c71393.
@@ -152,9 +152,6 @@ synchronized void updateCheckpointOnReplica(long globalCheckpoint) { | |||
if (this.globalCheckpoint <= globalCheckpoint) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment about when the current global checkpoint can be higher? here is what I wrote in #10708
Note that the global checkpoint is a local knowledge of that is update under the mandate of the primary. It may be that the primary information is lagging compared to a replica. This can happen when a replica is promoted to a primary (but still has stale info).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.
@@ -391,7 +391,7 @@ public void finalizeRecovery() { | |||
StopWatch stopWatch = new StopWatch().start(); | |||
logger.trace("[{}][{}] finalizing recovery to {}", indexName, shardId, request.targetNode()); | |||
cancellableThreads.execute(() -> { | |||
recoveryTarget.finalizeRecovery(); | |||
recoveryTarget.finalizeRecovery(shard.getGlobalCheckpoint()); | |||
shard.markAllocationIdAsInSync(recoveryTarget.getTargetAllocationId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
paranoia - can we flip this around and mark the target allocation as "in sync" before we give it the global checkpoint? it at least reads better as "we know you are in sync and therefore every global checkpoint advances will take you into account"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.
* tombstone files. | ||
**/ | ||
void finalizeRecovery(); | ||
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye)
* tombstone files. | ||
**/ | ||
void finalizeRecovery(); | ||
* The finalize request clears unreferenced translog files, refreshes the engine now that new segments are available, enables garbage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct anymore - clean the translog files? (I know it was there before, it just caught my eye)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it was already not correct. I pushed 1c71393.
@Override | ||
public long generateSeqNo() { | ||
if (rarely()) { | ||
// force skipping a sequence number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you evil person :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should do the test based on real world scenarios we know can happen. I think it will make things clearer?
For example, using a replica engine :
- Index a doc, with seq#1
- Delete the same doc with seq#3
- Flush
- index doc with seq#2
To do so on a primary takes something more evil - instead of skiping translog ops, make sure this come "out of order". So make the seq number service put a seq# aside but then serve it on a follow up request.
I think this also serve as documentation of when this can happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a plan here, but it's a work-in-progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume this is still in progress? (which is fine)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed f57eb99.
} | ||
} | ||
|
||
public void testOutOfOrderSequenceNumbersWithVersionConflict() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++. Maybe also add a sanity check that a get on the doc at the end gives us what we expect? (deleted or found)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 1c71393.
May the git push --force be with you if you require a more elaborative commit message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments
} | ||
} | ||
|
||
final boolean exists = operations.get(operations.size() - 1) instanceof Engine.Index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm... what happens if the last op is an index, but the previous op was a delete with a higher version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This represents whether or not the last operation before shuffling, and hence the operation with the highest version, is an index or delete operation. I think that this is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep. it's just before the shuffle. got confused. all good.
} else { | ||
final long seqNo; | ||
if (index.origin() == Operation.Origin.PRIMARY) { | ||
final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this is a replica and we have a version conflict and throws an exception? I think we still end up not marking the seq no as completed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like this?
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index e75dc47..d16c11e 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -86,6 +86,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.LongSupplier;
+import java.util.function.Supplier;
public class InternalEngine extends Engine {
@@ -444,11 +445,18 @@ public class InternalEngine extends Engine {
}
}
- private boolean checkVersionConflict(
+ /**
+ * checks for version conflicts and returns the right result object if conflict was detected. returns `null`
+ * if no conflicts was found and indexing should proceed as normal
+ */
+ private <T extends Result> T checkVersionConflict(
final Operation op,
final long currentVersion,
final long expectedVersion,
- final boolean deleted) {
+ final boolean deleted,
+ final Supplier<T> resultOnSuccess,
+ final Function<Exception, T> resultOnFailure) {
+ final T result;
if (op.versionType() == VersionType.FORCE) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
// If index was created in 5.0 or later, 'force' is not allowed at all
@@ -461,15 +469,17 @@ public class InternalEngine extends Engine {
if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) {
if (op.origin().isRecovery()) {
- // version conflict, but okay
- return true;
+ // version conflict, but okay, mark as success
+ result = resultOnSuccess.get();
} else {
// fatal version conflict
- throw new VersionConflictEngineException(shardId, op.type(), op.id(),
- op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
+ result = resultOnFailure.apply(new VersionConflictEngineException(shardId, op.type(), op.id(),
+ op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted));
}
+ } else {
+ result = null;
}
- return false;
+ return result;
}
private long checkDeletedAndGCed(VersionValue versionValue) {
@@ -584,6 +594,7 @@ public class InternalEngine extends Engine {
final Translog.Location location;
final long updatedVersion;
IndexResult indexResult = null;
+ long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
@@ -648,23 +659,18 @@ public class InternalEngine extends Engine {
}
}
final long expectedVersion = index.version();
- final boolean conflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
+ IndexResult result = checkVersionConflict(index, currentVersion, expectedVersion, deleted,
+ () -> new IndexResult(currentVersion, index.seqNo(), false),
+ exception -> new IndexResult(exception, currentVersion, index.seqNo()));
- final long seqNo;
- if (index.origin() == Operation.Origin.PRIMARY) {
- if (!conflict) {
+ if (result == null) {
+
+ if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService.generateSeqNo();
- } else {
- seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
- } else {
- seqNo = index.seqNo();
- }
- if (conflict) {
- // skip index operation because of version conflict on recovery
- indexResult = new IndexResult(expectedVersion, seqNo, false);
- } else {
+ index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
+
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);
@@ -686,8 +692,8 @@ public class InternalEngine extends Engine {
}
indexResult = new IndexResult(updatedVersion, seqNo, deleted);
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
- ? translog.add(new Translog.Index(index, indexResult))
- : null;
+ ? translog.add(new Translog.Index(index, indexResult))
+ : null;
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
indexResult.setTranslogLocation(location);
}
@@ -695,7 +701,7 @@ public class InternalEngine extends Engine {
indexResult.freeze();
return indexResult;
} finally {
- if (indexResult != null && indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strengthened the test to include the replica case (it caught the issue), and incorporated your suggestion.
2dc75b9
to
3c37f4b
Compare
Thanks @bleskes, I've responded to your feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! that test qualifies for the brain olympics :) left some minor comments that don't require another review
@@ -574,6 +611,7 @@ private IndexResult innerIndex(Index index) throws IOException { | |||
final Translog.Location location; | |||
final long updatedVersion; | |||
IndexResult indexResult = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - this can be put in scope and next to where it's created and probably made final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b9f68d4.
deleteResult.setTranslogLocation(location); | ||
} | ||
deleteResult.setTook(System.nanoTime() - delete.startTime()); | ||
deleteResult.freeze(); | ||
return deleteResult; | ||
} finally { | ||
if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should use the seqNo variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b9f68d4.
@Override | ||
public long generateSeqNo() { | ||
if (rarely()) { | ||
// force skipping a sequence number |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume this is still in progress? (which is fine)
final Term uid = newUid("1"); | ||
final Document document = testDocumentWithTextField(); | ||
final AtomicLong sequenceNumber = new AtomicLong(); | ||
final Engine.Operation.Origin origin = randomFrom(PEER_RECOVERY, PRIMARY, PEER_RECOVERY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant one of this to be translog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b9f68d4.
if (deleteResult != null && deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); | ||
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { | ||
seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should use the seqNo variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed b9f68d4. Annoying I caught that in the innerIndex
but not here 😞.
Thanks @bleskes. I'll let CI have a go at this before merging. |
My CI passes, but the PR CI build here is struggling with the checkout problem that we've been seeing. I'm going to merge and will watch our regular CI. |
This commit touches addresses issues related to recovery and sequence numbers:
maximum sequence number at least as large as that sequence number,
yet the operation corresponding to that sequence number can be
missing from both the Lucene commit and the translog. This means that
upon recovery the local checkpoint will be stuck at or below this
missing sequence number. To address this, we force the local
checkpoint to the maximum sequence number in the Lucene commit when
opening the engine. Note that there can still be gaps in the history
in the translog but we do not address those here.
of peer recovery.
Lastly, this work uncovered some bugs in the assignment of sequence
numbers on replica operations:
very likely introduced as a result of resolving merge conflicts
version conflict with a previous operation were never marked as
processed
Relates #10708